{
  "metadata" : {
    "id" : "75765162-2d2f-4070-b5b4-9818cac60853",
    "name" : "Structured-Streaming-in-action",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "sparkNotebook" : null,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : [ "org.apache.spark %% spark-sql-kafka-0-10 % 2.3.0" ],
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null,
    "customVars" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "6E2995E02B244E978E1327B7A60484F0"
    },
    "cell_type" : "markdown",
    "source" : "# Structured Streaming - Kafka Example\n\nThe intention of this example is to explore the main aspects of the Structured Streaming API.\n\n - We will the Kafka `source` to consume the `iot-data` topic.\n - We use a file `sink` to store the data into a _Parquet_ file."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "3B783C2DA5A9409E85DF2CE7F061AECB"
    },
    "cell_type" : "code",
    "source" : [ "import java.io.File\n", "val topic = \"iot-data\"\n", "val workDir = \"/tmp/streaming-with-spark\"\n", "val referenceFile = \"sensor-records.parquet\"\n", "val targetFile = \"structured_enrichedIoTStream.parquet\"\n", "val targetPath = new File(workDir, targetFile).getAbsolutePath\n", "val unknownSensorsTargetFile = \"unknownSensorsStream.parquet\"\n", "val unknownSensorsTargetPath = new File(workDir, unknownSensorsTargetFile).getAbsolutePath\n", "val kafkaBootstrapServer = \"127.0.0.1:9092\"" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import java.io.File\ntopic: String = iot-data\nworkDir: String = /tmp/streaming-with-spark\nreferenceFile: String = sensor-records.parquet\ntargetFile: String = structured_enrichedIoTStream.parquet\ntargetPath: String = /tmp/streaming-with-spark/structured_enrichedIoTStream.parquet\nunknownSensorsTargetFile: String = unknownSensorsStream.parquet\nunknownSensorsTargetPath: String = /tmp/streaming-with-spark/unknownSensorsStream.parquet\nkafkaBootstrapServer: String = 127.0.0.1:9092\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 1,
      "time" : "Took: 1.076s, at 2019-03-02 20:46"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "3225F1A3939642B087F1BE6A37EB9D03"
    },
    "cell_type" : "code",
    "source" : [ "val rawData = sparkSession.readStream\n", "      .format(\"kafka\")\n", "      .option(\"kafka.bootstrap.servers\", kafkaBootstrapServer)\n", "      .option(\"subscribe\", topic)\n", "      .option(\"enable.auto.commit\", true)\n", "      .option(\"group.id\", \"iot-data-consumer\")\n", "      .option(\"startingOffsets\", \"earliest\")\n", "      .load()" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "rawData: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 3,
      "time" : "Took: 0.972s, at 2017-08-09 18:00"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "497CEEFAB7DF40F884CC7A8139C3DA5F"
    },
    "cell_type" : "code",
    "source" : [ "rawData.isStreaming" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "res7: Boolean = true\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : "true"
      },
      "output_type" : "execute_result",
      "execution_count" : 5,
      "time" : "Took: 0.841s, at 2017-08-09 18:00"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "A74BF086DCC240168F21E57797088678"
    },
    "cell_type" : "code",
    "source" : [ "rawData.printSchema()" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "root\n |-- key: binary (nullable = true)\n |-- value: binary (nullable = true)\n |-- topic: string (nullable = true)\n |-- partition: integer (nullable = true)\n |-- offset: long (nullable = true)\n |-- timestamp: timestamp (nullable = true)\n |-- timestampType: integer (nullable = true)\n\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 9,
      "time" : "Took: 0.764s, at 2017-08-09 14:33"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "A39D7FB1A9AC496F8DFC7502EB0A4C29"
    },
    "cell_type" : "code",
    "source" : [ "case class SensorData(sensorId: Int, timestamp: Long, value: Double)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "defined class SensorData\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 10,
      "time" : "Took: 0.541s, at 2017-08-09 14:33"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "6E729EE3495E4AA88FC9E347BDEE3210"
    },
    "cell_type" : "code",
    "source" : [ "val iotData = rawData.select(\"value\").as[String].map{r =>\n", "  val Array(id, timestamp, value) = r.split(\",\")\n", "  SensorData(id.toInt, timestamp.toLong, value.toDouble)\n", "}" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "iotData: org.apache.spark.sql.Dataset[SensorData] = [sensorId: int, timestamp: bigint ... 1 more field]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 11,
      "time" : "Took: 0.799s, at 2017-08-09 14:33"
    } ]
  }, {
    "metadata" : {
      "id" : "CD174F6E3EF1475AA51899B5931B7415"
    },
    "cell_type" : "markdown",
    "source" : "## Load the reference data from a parquet file¶\nWe also cache the data to keep it in memory and improve the performance of our steaming application"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "0314D4CF7E88448EB2A5BDBED5839282"
    },
    "cell_type" : "code",
    "source" : [ "val sensorRef = sparkSession.read.parquet(s\"$workDir/$referenceFile\")\n", "sensorRef.cache()" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/learningsparkstreaming/sensor-records.parquet;\n  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:360)\n  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)\n  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)\n  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)\n  at scala.collection.immutable.List.foreach(List.scala:381)\n  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)\n  at scala.collection.immutable.List.flatMap(List.scala:344)\n  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)\n  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)\n  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:559)\n  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:543)\n  ... 63 elided\n"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "967DA9CD00034B93A9F9077ACCABBF69"
    },
    "cell_type" : "code",
    "source" : [ "val query = iotData.writeStream\n", "  .outputMode(\"append\")\n", "  .format(\"parquet\")\n", "  .option(\"path\", workDir)\n", "  .option(\"checkpointLocation\", \"/tmp/checkpoint\")\n", "  .start()" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "queryDef: org.apache.spark.sql.streaming.DataStreamWriter[SensorData] = org.apache.spark.sql.streaming.DataStreamWriter@58df37bb\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 27,
      "time" : "Took: 0.430s, at 2017-08-09 13:58"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "presentation" : {
        "tabs_state" : "{\n  \"tab_id\": \"#tab636027600-0\"\n}",
        "pivot_chart_state" : "{\n  \"hiddenAttributes\": [],\n  \"menuLimit\": 200,\n  \"cols\": [],\n  \"rows\": [],\n  \"vals\": [],\n  \"exclusions\": {},\n  \"inclusions\": {},\n  \"unusedAttrsVertical\": 85,\n  \"autoSortUnusedAttrs\": false,\n  \"inclusionsInfo\": {},\n  \"aggregatorName\": \"Count\",\n  \"rendererName\": \"Table\"\n}"
      },
      "id" : "C60D42C6AA7C43108AF52A4EEA7B407E"
    },
    "cell_type" : "code",
    "source" : [ "\n", "query.recentProgress" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "res38: Array[org.apache.spark.sql.streaming.StreamingQueryProgress] =\nArray({\n  \"id\" : \"ce29c1eb-bebc-45cb-abd8-3e6437c16518\",\n  \"runId\" : \"dcfd946a-097a-41c1-b810-f848baaddc10\",\n  \"name\" : null,\n  \"timestamp\" : \"2017-08-09T11:52:44.721Z\",\n  \"numInputRows\" : 21421,\n  \"processedRowsPerSecond\" : 56819.62864721486,\n  \"durationMs\" : {\n    \"addBatch\" : 236,\n    \"getBatch\" : 4,\n    \"getOffset\" : 109,\n    \"queryPlanning\" : 2,\n    \"triggerExecution\" : 377,\n    \"walCommit\" : 13\n  },\n  \"stateOperators\" : [ ],\n  \"sources\" : [ {\n    \"description\" : \"KafkaSource[Subscribe[iot-data]]\",\n    \"startOffset\" : {\n      \"iot-data\" : {\n        \"0\" : 3468807\n      }\n    },\n    \"endOffset\" : {\n      \"iot-data\" : {\n        \"0\" : 3490228\n      }\n    },\n    \"numInputRows\" : 21421,\n    \"processedRowsPerSecond\" : 5..."
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : "<div>\n      <script data-this=\"{&quot;dataId&quot;:&quot;anon65387c69018f4a2f725ea09a1c65bfc6&quot;,&quot;dataInit&quot;:[],&quot;genId&quot;:&quot;636027600&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tabs'], \n      function(playground, _magictabs) {\n        // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n        // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n        playground.call(data,\n                        this\n                        ,\n                        {\n    \"f\": _magictabs,\n    \"o\": {}\n  }\n  \n                        \n                        \n                      );\n      }\n    );/*]]>*/</script>\n    <div>\n      <div>\n        <ul class=\"nav nav-tabs\" id=\"ul636027600\"><li>\n              <a href=\"#tab636027600-0\"><i class=\"fa fa-table\"/></a>\n            </li><li>\n              <a href=\"#tab636027600-1\"><i class=\"fa fa-cubes\"/></a>\n            </li></ul>\n\n        <div class=\"tab-content\" id=\"tab636027600\"><div class=\"tab-pane\" id=\"tab636027600-0\">\n            <div>\n      <script data-this=\"{&quot;dataId&quot;:&quot;anona2833142b172b7046dca0610cc6cfff1&quot;,&quot;dataInit&quot;:[{&quot;name&quot;:null,&quot;timestamp&quot;:&quot;2017-08-09T11:52:44.721Z&quot;,&quot;sink&quot;:&quot;{\\n  \\&quot;description\\&quot; : \\&quot;FileSink[/tmp/learning-spark-streaming]\\&quot;\\n}&quot;,&quot;id&quot;:&quot;ce29c1eb-bebc-45cb-abd8-3e6437c16518&quot;,&quot;durationMs&quot;:&quot;{triggerExecution=377, queryPlanning=2, getBatch=4, getOffset=109, addBatch=236, walCommit=13}&quot;,&quot;stateOperators&quot;:&quot;[Lorg.apache.spark.sql.streaming.StateOperatorProgress;@25a9cb19&quot;,&quot;eventTime&quot;:&quot;{}&quot;,&quot;batchId&quot;:320,&quot;sources&quot;:&quot;[Lorg.apache.spark.sql.streaming.SourceProgress;@5968b1ec&quot;,&quot;runId&quot;:&quot;dcfd946a-097a-41c1-b810-f848baaddc10&quot;}],&quot;genId&quot;:&quot;2039901549&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tableChart'], \n      function(playground, _magictableChart) {\n        // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n        // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n        playground.call(data,\n                        this\n                        ,\n                        {\n    \"f\": _magictableChart,\n    \"o\": {\"headers\":[\"id\",\"runId\",\"name\",\"timestamp\",\"batchId\",\"durationMs\",\"eventTime\",\"stateOperators\",\"sources\",\"sink\"],\"width\":600,\"height\":400}\n  }\n  \n                        \n                        \n                      );\n      }\n    );/*]]>*/</script>\n    <div>\n      <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon564eef1043b7469d6659ec9f7f9fd047&quot;,&quot;initialValue&quot;:&quot;1&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n  ko.applyBindings({\n      value: O.makeObservable(valueId, initialValue)\n    },\n    this\n  );\n});\n        /*]]>*/</script></p> entries total</span>\n      <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon30775eece9075d649d1756475a6b177f&quot;,&quot;initialValue&quot;:&quot;&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n  ko.applyBindings({\n      value: O.makeObservable(valueId, initialValue)\n    },\n    this\n  );\n});\n        /*]]>*/</script></p></span>\n      <div>\n      </div>\n    </div></div>\n            </div><div class=\"tab-pane\" id=\"tab636027600-1\">\n            <div>\n      <script data-this=\"{&quot;dataId&quot;:&quot;anon1824fb513d6f33e0b75636fd9ecc8708&quot;,&quot;dataInit&quot;:[{&quot;name&quot;:null,&quot;timestamp&quot;:&quot;2017-08-09T11:52:44.721Z&quot;,&quot;sink&quot;:&quot;{\\n  \\&quot;description\\&quot; : \\&quot;FileSink[/tmp/learning-spark-streaming]\\&quot;\\n}&quot;,&quot;id&quot;:&quot;ce29c1eb-bebc-45cb-abd8-3e6437c16518&quot;,&quot;durationMs&quot;:&quot;{triggerExecution=377, queryPlanning=2, getBatch=4, getOffset=109, addBatch=236, walCommit=13}&quot;,&quot;stateOperators&quot;:&quot;[Lorg.apache.spark.sql.streaming.StateOperatorProgress;@25a9cb19&quot;,&quot;eventTime&quot;:&quot;{}&quot;,&quot;batchId&quot;:320,&quot;sources&quot;:&quot;[Lorg.apache.spark.sql.streaming.SourceProgress;@5968b1ec&quot;,&quot;runId&quot;:&quot;dcfd946a-097a-41c1-b810-f848baaddc10&quot;}],&quot;genId&quot;:&quot;1932819&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/pivotChart'], \n      function(playground, _magicpivotChart) {\n        // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n        // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n        playground.call(data,\n                        this\n                        ,\n                        {\n    \"f\": _magicpivotChart,\n    \"o\": {\"width\":600,\"height\":400,\"derivedAttributes\":{},\"extraOptions\":{}}\n  }\n  \n                        \n                        \n                      );\n      }\n    );/*]]>*/</script>\n    <div>\n      <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anoncd45d6e2ffee9eeb20e241abe6587b39&quot;,&quot;initialValue&quot;:&quot;1&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n  ko.applyBindings({\n      value: O.makeObservable(valueId, initialValue)\n    },\n    this\n  );\n});\n        /*]]>*/</script></p> entries total</span>\n      <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon64be8a6085baad583dc1759992d0a8e7&quot;,&quot;initialValue&quot;:&quot;&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n  ko.applyBindings({\n      value: O.makeObservable(valueId, initialValue)\n    },\n    this\n  );\n});\n        /*]]>*/</script></p></span>\n      <div>\n      </div>\n    </div></div>\n            </div></div>\n      </div>\n    </div></div>"
      },
      "output_type" : "execute_result",
      "execution_count" : 26,
      "time" : "Took: 0.572s, at 2017-08-09 13:52"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "A8292D947D7E4A889B416F5C2BB158C7"
    },
    "cell_type" : "code",
    "source" : [ "" ],
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}